iT邦幫忙

2022 iThome 鐵人賽

DAY 14
2

網路連結

為了使節點 (ServiceBrokers) 能夠互相通訊連結,需要設置一個 Transporter 來處理通訊傳輸事件。Moleculer 支援常見的 Transporter 來作為中央的訊息 Broker,可以透過發布/訂閱的方式進行訊息傳遞,提供遠端節點一個可靠的資料交換方式。


Fig. 1. Message Broker

Transporter

微服務通常會建立在多個節點,因此 Transporter 模組就顯得相當重要。當 Transporter 與其它節點進行通訊的時,它會傳輸事件、請求呼叫及響應處理等。若節點實例在多個不同節點上運作時,它將會根據規則進行節點間的負載平衡。由於運算邏輯與 Transporter 模組是分開的,因此 Transporter 是可替換的,它不會影響到程式碼的邏輯開發。

以下將介紹 Moleculer 框架中內建的 Transporter,其中細部設定筆者未測試或確認過的部分,本文將暫時保留官方提供的範例註解說明。

TCP Transporter

TCP 是一個無需相依套件,不須設定的 Transporter 。它使用 Gossip 協定[2] 來傳播節點狀態、服務清單及 Heartbeat 機制。它包含了一個整合的 UDP 探索功能,可以用來偵測網路上新增的節點或是離線的節點。

範例:快速使用

moleculer.config.js

module.exports = {
    transporter: "TCP"
};

範例:完整的選項預設值

moleculer.config.js

module.exports = {
    logger: true,
    transporter: {
        type: "TCP",
        options: {
            // 啟用 UDP 探索
            udpDiscovery: true,
            // Reusing UDP server socket
            udpReuseAddr: true,

            // UDP port
            udpPort: 4445,
            // UDP bind address (if null, bind on all interfaces)
            udpBindAddress: null,
            // UDP 發送間隔 (秒)
            udpPeriod: 30,

            // Multicast address.
            udpMulticast: "239.0.0.0",
            // Multicast TTL setting
            udpMulticastTTL: 1,

            // Send broadcast (Boolean, String, Array<String>)
            udpBroadcast: false,

            // TCP server port. Null or 0 means random port
            port: null,
            // Static remote nodes address list (when UDP discovery is not available)
            urls: null,
            // Use hostname as preffered connection address
            useHostname: true,

            // Gossip sending period in seconds
            gossipPeriod: 2,
            // Maximum enabled outgoing connections. If reach, close the old connections
            maxConnections: 32,
            // Maximum TCP packet size
            maxPacketSize: 1 * 1024 * 1024            
        }
    }
};

範例:陣列清單方式設定靜態節點列表

如果你的網路環境不允許使用 UDP 的話,請在 urls 選項提供節點清單(Host/IP 、連接埠、節點 ID)。

moleculer.config.js

module.exports = {
    nodeID: "node-1",
    logger: true,
    transporter: {
        type: "TCP",
        options: {
            udpDiscovery: false,
            urls: [
                "172.17.0.1:6000/node-1",
                "172.17.0.2:6000/node-2",
                "172.17.0.3:6000/node-3"                
            ],
        }
    }
};

範例:逗號分隔方式設定節點列表,注意開頭需要加上 tcp://

moleculer.config.js

module.exports = {
    nodeID: "node-1",
    transporter: "tcp://172.17.0.1:6000/node-1,172.17.0.2:6000/node-2,172.17.0.3:6000/node-3"
};

範例:使用 JSON 檔案設定節點列表

moleculer.config.js

module.exports = {
    nodeID: "node-1",
    transporter: "file://./nodes.json"
};

nodes.json

[
    "127.0.0.1:6001/client-1",
    "127.0.0.1:7001/server-1",
    "127.0.0.1:7002/server-2"
]

注意,不需要所有的遠端節點,只要至少有一個節點上線即可。例如創建一個無服務的節點,它僅作為 gossiper 而不做任何事,只是為了使用 gossip 協定分享其它節點的地址,但所有的節點都必須知道 gossiper 節點的地址,才能夠與其它所有節點進行通訊。

NATS Transporter

NATS 是一個敏捷、高效又彈性的開源訊息傳遞系統,適合用於邊緣運算、雲原生、混合部署的服務架構[3] 。

使用前請安裝 NATS 套件 npm install nats --save

範例:快速使用,預設連線為 nats://localhost:4222

moleculer.config.js

module.exports = {
    transporter: "NATS"
};

範例:連線到遠端 NATS 服務器

moleculer.config.js

module.exports = {
    transporter: "nats://user:pass@nats-server:4222"
};

範例:選項設定方式

moleculer.config.js

module.exports = {
    transporter: {
        type: "NATS",
        options: {
            servers: ["nats://localhost:4222"],
            user: "admin",
            pass: "password"
        }
    }
};

範例: TLS 設定方式

參數請參閱: https://github.com/nats-io/nats.js#tlsoptions

moleculer.config.js

const fs = require("fs");

module.exports = {
    transporter: {
        type: "NATS",
        options: {
            servers: ["nats://localhost:4222"]
            tls: {
                key: fs.readFileSync('./client-key.pem'),
                cert: fs.readFileSync('./client-cert.pem'),
                ca: [ fs.readFileSync('./ca.pem') ]
            }
        }
    }
};

Redis Transporter

Redis 是一個開源的訊息 Broker[4] 。

使用前請安裝 ioredis 套件 npm install ioredis --save

範例:快速使用,預設連線為 redis://localhost:6379

moleculer.config.js

module.exports = {
    transporter: "Redis"
};

範例:連線到 Redis 服務器

moleculer.config.js

module.exports = {
    transporter: "redis://localhost:6379"
};

範例:安全連線到 Redis 服務器

moleculer.config.js

module.exports = {
    transporter: "rediss://localhost:6379"
};

範例:選項設定方式

moleculer.config.js

module.exports = {
    transporter: {
        type: "Redis",
        options: {
            host: "redis-server",
            db: 0
        }
    }
};

範例:Redis 多個節點集群

moleculer.config.js

module.exports = {
    transporter: {
        type: "Redis",
        options: {
            cluster: {
                nodes: [
                    { host: "127.0.0.1", port: 6380 },
                    { host: "127.0.0.1", port: 6381 }
                ]
            }
        }
    }
};

MQTT Transporter

MQTT 是一個物聯網訊息傳輸標準[5] 。例如使用 Eclipse Mosquitto 這套輕量級的開源 MQTT 訊息 Broker [6] 。

使用前請安裝 MQTT 套件 npm install mqtt --save

範例:快速使用,預設連線為 mqtt://localhost:1883

moleculer.config.js

module.exports = {
    transporter: "MQTT"
};

範例:連線到遠端 MQTT 服務器

moleculer.config.js

module.exports = {
    transporter: "mqtt://mqtt-server:1883"
};

範例:安全連線到遠端 MQTT 服務器

moleculer.config.js

module.exports = {
    transporter: "mqtts://mqtt-server:1883"
};

範例:選項設定方式

moleculer.config.js

module.exports = {
    transporter: {
        type: "MQTT",
        options: {
            host: "mqtt-server",
            port: 1883,
			// QoS 等級,詳情請參閱: https://github.com/mqttjs/MQTT.js#about-qos
            qos: 0,
            // 發送主題分割符號
            topicSeparator: "."
        }
    }
};

AMQP (0.9) Transporter

AMQP 0.9 是一個訊息傳輸標準[7] 。例如使用 RabbitMQ [8]。

使用前請安裝 amqplib 套件 npm install amqplib --save

範例:快速使用,預設連線為 amqp://guest:guest@localhost:5672

moleculer.config.js

module.exports = {
    transporter: "AMQP"
});

範例:連線到遠端 AMQP 服務器

moleculer.config.js

module.exports = {
    transporter: "amqp://rabbitmq-server:5672"
};

範例:安全連線到遠端 AMQP 服務器

moleculer.config.js

module.exports = {
    transporter: "amqps://rabbitmq-server:5672"
};

範例:選項設定方式

moleculer.config.js

module.exports = {
    transporter: {
        type: "AMQP",
        options: {
            url: "amqp://user:pass@rabbitmq-server:5672",
            eventTimeToLive: 5000,
            prefetch: 1,
            socketOptions: {
                servername: process.env.RABBIT_SERVER_NAME
            }
            // 設定後隊列若在 2 分鐘內(預設值)沒有任何客戶端連線將會被自動刪除
            autoDeleteQueues: true
        }
    }
};

AMQP 1.0 Transporter

AMQP 1.0 是一個訊息傳輸標準[7] 。例如使用 ActiveMq[9] 或 RabbitMQ [8] + rabbitmq_amqp1_0 Plugin[10] 。

注意,這是一個實驗性 Transporter ,請不要在生產環境使用它!

使用前請安裝 rhea-promise 套件 npm install rhea-promise --save

範例:快速使用,預設連線為 amqp10://guest:guest@localhost:5672

moleculer.config.js

module.exports = {
    transporter: "AMQP10"
});

範例:連線到遠端 AMQP 服務器

moleculer.config.js

module.exports = {
    transporter: "amqp10://activemq-server:5672"
};

範例:選項設定方式

moleculer.config.js

module.exports = {
    transporter: {
        url: "amqp10://user:pass@activemq-server:5672",
        eventTimeToLive: 5000,
        heartbeatTimeToLive: 5000,
        connectionOptions: { // rhea connection options https://github.com/amqp/rhea#connectoptions, example:
            ca: "", // (if using tls)
            servername: "", // (if using tls)
            key: "", // (if using tls with client auth)
            cert: "" // (if using tls with client auth)
        },
        queueOptions: {}, // rhea queue options https://github.com/amqp/rhea#open_receiveraddressoptions
        topicOptions: {}, // rhea queue options https://github.com/amqp/rhea#open_receiveraddressoptions
        messageOptions: {}, // rhea message specific options https://github.com/amqp/rhea#message
        topicPrefix: "topic://", // RabbitMq uses '/topic/' instead, 'topic://' is more common
        prefetch: 1
    }
};

Kafka Transporter

Kafka 是一個開源的分散式事件串流平台[11] 。

使用前請安裝 kafka-node 套件 npm install kafka-node --save

範例:連線至 Kafka 。

由於 Kafka 3.3 版以上會將 KRaft 模式標記為可生產使用,也預計在 3.4 版棄用 ZooKeeper 模式,因此你也可以提早試試不依賴 ZooKeeper 的新模式。

moleculer.config.js

module.exports = {
    transporter: "kafka://192.168.51.29:2181"
};

範例:選項設定方式

moleculer.config.js

module.exports = {
    transporter: {
        type: "kafka",
        options: {
            host: "192.168.51.29:2181",

            // KafkaClient 選項,請參閱: https://github.com/SOHU-Co/kafka-node#kafkaclient
            client: {
                zkOptions: undefined,
                noAckBatchOptions: undefined,
                sslOptions: undefined
            },

            // KafkaProducer 選項,請參閱: https://github.com/SOHU-Co/kafka-node#producerkafkaclient-options-custompartitioner
            producer: {},
            customPartitioner: undefined,

            // ConsumerGroup 選項,請參閱: https://github.com/SOHU-Co/kafka-node#consumergroupoptions-topics
            consumer: {
            },

            // 進階 `send` 選項,請參閱: https://github.com/SOHU-Co/kafka-node#sendpayloads-cb
            publish: {
                partition: 0,
                attributes: 0
            }               
        }
    }
};

NATS Streaming (STAN) Transporter

由於官方將棄用此方法,筆者就不做介紹了。

https://docs.nats.io/legacy/stan/intro

客製化 Transporter

你也可以建立客製化的 Transporter 模組,官方建議可以參考 NATS Transporter 的原始碼[13] 來修改,再實作 connectdisconnectsubscribesend 方法。

範例:建立客製化的 Transporter

my-transporter.js

const BaseTransporter = require("moleculer").Transporters.Base;

class MyTransporter extends BaseTransporter {
    connect() { /*...*/ }
    disconnect() { /*...*/ }
    subscribe() { /*...*/ }
    send() { /*...*/ }
}

module.exports = MyTransporter;

範例:使用客製化的 Transporter

moleculer.config.js

const MyTransporter = require("./my-transporter");

module.exports = {
    transporter: new MyTransporter()
};

禁用平衡器

由於一些 Transporter 已內建了平衡器的解決方案 (例如 RabbitMQNATS ),因此你可以設定 disableBalancer: true 來禁用 Moleculer 的平衡器。

moleculer.config.js

module.exports = {
    disableBalancer: true,
    transporter: "nats://some-server:4222"
};

注意,如果禁用了 Moleculer 內建的平衡器方案,所有的請求事件都將會透過 Transporter 傳輸,包含 本地服務 請求也會透過 Transporter 發送。

序列化

Transporter 需要一個序列化模組來 序列化反序列化 欲傳輸的資料封包, Moleculer 內建了許多序列化器,其中預設的序列化器是 JSON

範例:JSON 序列化器

moleculer.config.js

module.exports = {
    nodeID: "server-1",
    transporter: "NATS",
    serializer: "JSON"
};

範例:Avro 序列化器[14]

使用前請安裝 avsc 套件 npm install avsc --save

moleculer.config.js

module.exports = {
    serializer: "Avro"
};

範例:MsgPack 序列化器[15]

使用前請安裝 msgpack5 套件 npm install msgpack5 --save

moleculer.config.js

module.exports = {
    serializer: "MsgPack"
};

範例:Notepack 序列化器[16]

使用前請安裝 notepack 套件 npm install notepack.io --save

moleculer.config.js

module.exports = {
    serializer: "Notepack"
};

範例:ProtoBuf 序列化器[17]

使用前請安裝 protobufjs 套件 npm install protobufjs --save

moleculer.config.js

module.exports = {
    serializer: "ProtoBuf"
};

範例:Thrift 序列化器[18]

使用前請安裝 Thrift 套件 npm install thrift --save

moleculer.config.js

module.exports = {
    serializer: "Thrift"
};

範例:CBOR 序列化器[19]

CBOR (cbor-x) 是一個非常快的序列化器。

使用前請安裝 cbor-x 套件 npm install cbor-x --save

moleculer.config.js

module.exports = {
    serializer: "CBOR"
};

客製化序列化器

你可以建立客製化序列化模組,官方建議可以參考 JSON 序列化器的原始碼[20] 來修改,再實作 serializedeserialize 方法。

範例:建立客製化的序列化器

my-serializer

const BaseSerializer = require("moleculer").Serializers.Base;

class MySerializer extends BaseSerializer {
    serialize(obj, type) { /*...*/ }
    deserialize(buf, type) { /*...*/ }
}

module.exports = MySerializer;

範例:使用客製化的序列化器

moleculer.config.js

const MySerializer = require("./my-serializer");

module.exports = {
    serializer: new MySerializer()
};

參考文獻

[1] Networking, https://moleculer.services/docs/0.14/networking.html
[2] Gossip protocol, https://en.wikipedia.org/wiki/Gossip_protocol
[3] NATS, https://nats.io/
[4] Redis, https://redis.io/
[5] MQTT, https://mqtt.org/
[6] Eclipse Mosquitto, https://mosquitto.org/
[7] AMQP, https://www.amqp.org/
[8] RabbitMQ, https://www.rabbitmq.com/
[9] ActiveMq, https://activemq.apache.org/
[10] AMQP 1.0 support for RabbitMQ, https://github.com/rabbitmq/rabbitmq-amqp1.0
[11] Apache Kafka, https://kafka.apache.org/
[12] STAN, https://docs.nats.io/legacy/stan/intro
[13] Moleculer NATS Transporter, https://github.com/moleculerjs/moleculer/blob/master/src/transporters/nats.js
[14] Avro, https://github.com/mtth/avsc
[15] MsgPack, https://github.com/mcollina/msgpack5
[16] notepack, https://github.com/darrachequesne/notepack
[17] ProtoBuf, https://developers.google.com/protocol-buffers/
[18] Apache Thrift, https://thrift.apache.org/
[19] cbor-x, https://github.com/kriszyp/cbor-x
[20] JSON serializer, https://github.com/moleculerjs/moleculer/blob/master/src/serializers/json.js

家家酒小劇場

  • Otter - 今天的內容似乎又更多了 ...
  • Boxy - 跟昨天一樣只是看起來很多,你可以在單機建立兩個節點(不同 Port),再使用 NATS 搭配 Docker 快速建立服務來試試看效果。

上一篇
Day 13 : Middlewares
下一篇
Day 15 : 註冊與探索
系列文
Moleculer 家家酒31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言